/* when set to 1, no caching is done. 1 is the minimum value. */
#define SIRIDB_BUFFER_CACHE 64
-static int BUFFER_create_new(siridb_t * siridb, siridb_series_t * series);
-static int BUFFER_use_empty(siridb_t * siridb, siridb_series_t * series);
+static int buffer__create_new(siridb_t * siridb, siridb_series_t * series);
+static int buffer__use_empty(siridb_t * siridb, siridb_series_t * series);
+static int buffer__write_start(siridb_t * siridb, siridb_series_t * series);
+static void buffer__migrate_to_new(char * pt);
-static const uint64_t BUFFER_end = 0xffffffffffffffff;
+/* buffer__start cannot conflict with a series_id since id 0 is never used */
+static const uint32_t buffer__start = 0x00000000;
+static const uint64_t buffer__end = 0xffffffffffffffff;
/*
SEEK_SET) ||
/* write end ts */
- fwrite( &BUFFER_end,
+ fwrite( &buffer__end,
sizeof(uint64_t),
1,
siridb->buffer_fp) != 1) ? EOF : 0;
memcpy(buf, &point->ts, sizeof(uint64_t));
memcpy(buf + sizeof(uint64_t), &point->val, sizeof(qp_via_t));
- memcpy(
- buf + sizeof(uint64_t) + sizeof(qp_via_t),
- &BUFFER_end,
- sizeof(uint64_t));
+ memcpy(buf + 16, &buffer__end, sizeof(uint64_t));
return (
/* jump to position where to write the new point */
}
return (siridb->empty_buffers->len) ?
- BUFFER_use_empty(siridb, series) :
- BUFFER_create_new(siridb, series);
+ buffer__use_empty(siridb, series) :
+ buffer__create_new(siridb, series);
}
int siridb_buffer_fsync(siridb_t * siridb)
return 0;
}
+static void buffer__migrate_to_new(char * pt)
+{
+ char * npt = pt;
+ uint32_t series_id = *((uint32_t *) pt);
+ pt += sizeof(uint32_t);
+ size_t num = *((size_t *) pt);
+ pt += sizeof(size_t);
+
+ memcpy(npt, &buffer__start, sizeof(uint32_t));
+ npt += sizeof(uint32_t);
+ memcpy(npt, &series_id, sizeof(uint32_t));
+ npt += sizeof(uint32_t);
+
+ memmove(npt, pt, num * 16);
+ npt += num * 16;
+ memcpy(npt, &buffer__end, sizeof(uint64_t));
+}
+
/*
* Returns 0 if successful or -1 in case of an error.
* (signal might be raised)
FILE * fp;
FILE * fp_temp;
size_t read_at_once = 8;
- size_t num, i, j;
+ size_t num, i;
char buffer[siridb->buffer_size * read_at_once];
char * pt;
long int offset = 0;
siridb_series_t * series;
+ _Bool log_migrate = 1;
+ uint32_t buf_start, series_id;
+ uint64_t * ts;
log_info("Loading and cleanup buffer");
{
for (i = 0; i < num; i++)
{
-
pt = buffer + i * siridb->buffer_size;
- series = (siridb_series_t *)
- imap_get(siridb->series_map, *((uint32_t *) pt));
+ buf_start = *((uint32_t *) pt);
+ if (buf_start != buffer__start)
+ {
+ if (log_migrate)
+ {
+ log_warning("Buffer will be migrated");
+ log_migrate = 0;
+ }
+ buffer__migrate_to_new(pt);
+ }
+
+ pt += sizeof(uint32_t);
+ series_id = *((uint32_t *) pt);
+ pt += sizeof(uint32_t);
+
+ series = imap_get(siridb->series_map, series_id);
if (series == NULL)
{
series->bf_offset = offset;
- pt += sizeof(uint32_t);
-
- for ( j = (size_t) *pt, pt += sizeof(size_t);
- j--;
- pt += 16)
+ for (; *(ts = (uint64_t *) pt) != buffer__end; pt += 16)
{
- siridb_points_add_point(
- series->buffer,
- (uint64_t *) pt,
- (qp_via_t *) (pt + 8));
+ qp_via_t * val = (qp_via_t *) (pt + 8);
+ siridb_points_add_point(series->buffer, ts, val);
}
offset += siridb->buffer_size;
return 0;
}
+static int buffer__write_start(siridb_t * siridb, siridb_series_t * series)
+{
+ const size_t sz = sizeof(uint32_t) + sizeof(uint32_t) + sizeof(uint64_t);
+ char buf[sz];
+
+ memcpy(buf, &buffer__start, sizeof(uint32_t));
+ memcpy(buf + sizeof(uint32_t), &series->id, sizeof(uint32_t));
+ memcpy(buf + sizeof(uint64_t), &buffer__end, sizeof(uint64_t));
+
+ /* write series ID and 0 length to buffer */
+ return (fwrite(buf, sz, 1, siridb->buffer_fp) == 1) ? 0 : -1;
+}
+
/*
* Reserve a space in the buffer for a new series. The position of this space
* in the buffer is read from siridb->empty_buffers so this list must have
* Note that an available spot must be checked before calling this function.
* This functions has undefined behavior if no spot is found.
*/
-static int BUFFER_use_empty(siridb_t * siridb, siridb_series_t * series)
+static int buffer__use_empty(siridb_t * siridb, siridb_series_t * series)
{
- const size_t sz = sizeof(uint32_t) + sizeof(size_t);
- char buf[sz];
-
series->bf_offset = (long int) slist_pop(siridb->empty_buffers);
/* jump to the correct buffer position */
return -1;
}
- memcpy(buf, &series->id, sizeof(uint32_t));
- memcpy(buf + sizeof(uint32_t), &series->buffer->len, sizeof(size_t));
-
/* write series ID and 0 length to buffer */
- if (fwrite(buf, sz, 1, siridb->buffer_fp) != 1)
+ if (buffer__write_start(siridb, series))
{
ERR_FILE
return -1;
*
* Returns 0 if successful or -1 and a signal is raised in case of an error.
*/
-static int BUFFER_create_new(siridb_t * siridb, siridb_series_t * series)
+static int buffer__create_new(siridb_t * siridb, siridb_series_t * series)
{
long int buffer_pos;
/* get file descriptor */
return -1;
}
- /* write series ID to buffer */
- if (fwrite(&series->id, sizeof(uint32_t), 1, siridb->buffer_fp) != 1)
+ /* write buffer start and series ID to buffer */
+ if (buffer__write_start(siridb, series))
{
ERR_FILE
return -1;